package day03.window;

import beans.SensorReading;
import custom.MyAverageAggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Flink 流处理 API - CountWindow
 *
 * @author lvbingbing
 * @date 2022-01-01 13:05
 */
public class FlinkWindow02 {
    public static void main(String[] args) throws Exception {
        // 1、创建 FlinkWindow00 对象，有参构造会初始化 env，并从socket文本流中读取数据
        int parallelism = 1;
        FlinkWindow00 flinkWindow = new FlinkWindow00(parallelism);
        // 2、获取可执行环境
        StreamExecutionEnvironment env = flinkWindow.getEnv();
        // 3、学习计数窗口
        studyCountWindow(flinkWindow.getSingleOutputStreamOperator());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 学习计数窗口
     * <p>
     * 1、滚动窗口(Tumbling Window)
     * 2、滑动窗口(Sliding Window)
     * <p>
     *
     * @param sensorReadingStream <br>
     */
    private static void studyCountWindow(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        // 1、Tumbling Window
        studyTumblingWindow(sensorReadingStream);
        // 2、Sliding Window
        studySlidingWindow(sensorReadingStream);
    }

    /**
     * 滚动计数窗口
     *
     * @param sensorReadingStream <br>
     */
    private static void studyTumblingWindow(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        SingleOutputStreamOperator<Double> tumblingWindowDs = sensorReadingStream.keyBy("id")
                .countWindow(10)
                .aggregate(new MyAverageAggregateFunction());
        tumblingWindowDs.print("滚动计数窗口");
    }

    /**
     * 滑动计数窗口
     *
     * @param sensorReadingStream <br>
     */
    private static void studySlidingWindow(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        SingleOutputStreamOperator<Double> slidingWindowsDs = sensorReadingStream.keyBy("id")
                .countWindow(10, 5)
                .aggregate(new MyAverageAggregateFunction());
        slidingWindowsDs.print("滑动计数窗口");
    }
}
